package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Spark_Stream_Winodw2 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("BD2")
    val lines = ssc.socketTextStream("localhost",9999)
    val wordOne = lines.map((_,1))

    /*
    reduceByKeyAndWindow:当窗口时长比较大，但是滑块步长比较小，那么可以采用增加数据和删除的数据的方式
    不会产生重复计算
     */
    val winDS = wordOne.reduceByKeyAndWindow(
      (x: Int, y: Int) => {
        x + y
      },
      (x: Int, y: Int) => {
        x - y
      },
      Seconds(9), Seconds(3)
    )

    //SparkStreaming如果没有任何输出的操作，就会报错
    winDS.foreachRDD(//这里不会打印时间戳
      rdd=>{
        rdd.collect().foreach(println)

      }
    )


    ssc.start()
    ssc.awaitTermination()
  }

}
